在上一篇介紹了Reactor提供Scheduler來幫助開發者,這篇就是來說明具體是如何使用。
執行的方式與一般的operator一樣,會影響從publishOn
以下的operator chain,改變其threading context,也就是改變執行緒,直到如果有下一個publishOn
出現。
官方提供的範例,新增一個parallel-scheduler
,在主流程裡面宣告Flux
,最後新開一個Thread
來subscribe()
,這樣在publishOn
之前的操作都會是new Thread
裡面去執行,之後的則會是一開始宣告的Scheduler
裡面。
Scheduler s = Schedulers.newParallel("parallel-scheduler", 2);
final Flux<String> flux = Flux
.range(1, 2)
.map(i -> 10 + i)
.publishOn(s)
.map(i -> "value " + i);
Thread thread = new Thread(() -> flux.subscribe(System.out::println));
thread.start();
Thread.sleep(100);
另一篇Spring blog 提供的範例我覺得是可以更直覺的了解使用情境,假設需要去呼叫外部的一個阻斷式(blocking)的服務,如果沒有使用publishOn
特別指定Scheduler
,最後執行都會是在subscribe所處在的執行緒(main)裡面,就會等ABC處理完才會處理DE。
Flux.fromIterable(firstListOfUrls) //contains A, B and C
.map(url -> blockingWebClient.get(url))
.subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));
Flux.fromIterable(secondListOfUrls) //contains D and E
.map(url -> blockingWebClient.get(url))
.subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));
/*
main from first list, got A
main from first list, got B
main from first list, got C
main from second list, got D
main from second list, got E
*/
這時候如果加上publishOn
,還記得上一篇介紹過最適合用於阻斷式服務的就是boundedElastic
,就可以看到結果是穿插的,效能也就相對的更好。
Flux.fromIterable(firstListOfUrls) //contains A, B and C
.publishOn(Schedulers.boundedElastic())
.map(url -> blockingWebClient.get(url))
.subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));
Flux.fromIterable(secondListOfUrls) //contains D and E
.publishOn(Schedulers.boundedElastic())
.map(url -> blockingWebClient.get(url))
.subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));
/*
boundedElastic-1 from first list, got A
boundedElastic-2 from second list, got D
boundedElastic-1 from first list, got B
boundedElastic-2 from second list, got E
boundedElastic-1 from first list, got C
*/
跟publishOn
幾乎一模一樣,會去改變operator chain的threading context,差別在於publishOn
只會改變之後的,不會朔及既往,而subscribeOn
則是從頭到尾都改變,不論在哪一個位置,直到遇到publishOn
為止,也就是如果有publishOn
,那之後的operator 仍然會依照publishOn
所指定的Scheduler
。將上一個範例直接拿來改成使用subscribeOn
,那兩個map都會是在一開始宣告的Scheduler
內。
Scheduler s = Schedulers.newParallel("parallel-scheduler", 2);
final Flux<String> flux = Flux
.range(1, 2)
.map(i -> 10 + i)
.subscribeOn(s)
.map(i -> "value " + i);
Thread thread = new Thread(() -> flux.subscribe(System.out::println));
thread.start();
Thread.sleep(100);
至於為什麼有了publishOn
還需要subscribeOn
,理論上你只需要將publishOn
放在最前面就能夠取代subscribeOn
,而且我認為即便是放後面結果一樣,為了可讀性subscribeOn
應該也要放在最前面。其實原因就是有些情境當你沒辦法放在最前面的時候,假設有一個api或是function是別人寫好的你無法去更改,如果只有publishOn
是無法更改到上一層(upstream),這時候就需要靠subscribeOn
來處理。
最後還有一件事情從main thread
跳到其他的執行緒可以透過以上的方法,但是從其他執行緒想要再跳回main
是不可能的,雖然我也無法理解為何會有這樣的需求。